/*
 * File    : ListenerManager.java
 * Created : 15-Jan-2004
 * By      : parg
 * 
 * Azureus - a Java Bittorrent client
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details ( see the LICENSE file ).
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 */

package org.gudy.azureus2.core3.util;

/**
 * @author parg
 *
 */

/**
 * This class exists to support the invocation of listeners while *not* synchronized.
 * This is important as in general it is a bad idea to invoke an "external" component
 * whilst holding a lock on something as unexpected deadlocks can result.
 * It has been introduced to reduce the likelyhood of such deadlocks
 */

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

//import org.gudy.azureus2.core3.logging.LogEvent;
//import org.gudy.azureus2.core3.logging.LogIDs;
//import org.gudy.azureus2.core3.logging.Logger;


public class 
ListenerManager<T>
{
	private static final boolean TIME_LISTENERS = false;
	
	public static <T>ListenerManager<T>
	createManager(
		String										name,
		ListenerManagerDispatcher<T>				target )
	{
		return( new ListenerManager<T>( name, target, false ));
	}
	
	public static <T>ListenerManager<T>
	createAsyncManager(
		String							name,
		ListenerManagerDispatcher<T>	target )
	{
		return( new ListenerManager<T>( name, target, true ));
	}
	
	
	protected String	name;
	
	protected ListenerManagerDispatcher<T>				target;
	protected ListenerManagerDispatcherWithException	target_with_exception;
	
	protected boolean	async;
	protected AEThread2	async_thread;
	
	protected List<T>			listeners		= new ArrayList<T>(0);
	
	protected List<Object[]>	dispatch_queue;
//	protected AESemaphore		dispatch_sem;
	
	protected
	ListenerManager(
		String								_name,
		ListenerManagerDispatcher<T>		_target,
		boolean								_async )
	{
		name	= _name;
		target	= _target;
		async	= _async;
		
		if ( target instanceof ListenerManagerDispatcherWithException ){
			
			target_with_exception = (ListenerManagerDispatcherWithException)target;
		}
		
		if ( async ){
			
//			dispatch_sem	= new AESemaphore("ListenerManager::"+name);
			dispatch_queue 	= new LinkedList<Object[]>();
			
			if ( target_with_exception != null ){
				
				throw( new RuntimeException( "Can't have an async manager with exceptions!"));
			}
		}
	}
	
	public void
	addListener(
		T		listener )
	{
		synchronized( this ){
			
			ArrayList<T>	new_listeners	= new ArrayList<T>( listeners );
			
			if (new_listeners.contains(listener)) {
//				Logger.log(new LogEvent(LogIDs.CORE, LogEvent.LT_WARNING,
//						"addListener called but listener already added for " + name
//								+ "\n\t" + Debug.getStackTrace(true, false)));
			}
			new_listeners.add( listener );
			
			if (new_listeners.size() > 50) {
//				Logger.log(new LogEvent(LogIDs.CORE, LogEvent.LT_WARNING,
//						"addListener: over 50 listeners added for " + name
//								+ "\n\t" + Debug.getStackTrace(true, false)));
			}
			
			listeners	= new_listeners;
			
			if ( async && async_thread == null ){
				
				async_thread = new AEThread2( name, true )
					{
						public void
						run()
						{
							dispatchLoop();
						}
					};
									
				async_thread.start();
			}
		}
	}
	
	public void
	removeListener(
		Object		listener )
	{
		synchronized( this ){
			
			ArrayList<T>	new_listeners = new ArrayList<T>( listeners );
			
			new_listeners.remove( listener );
			
			listeners	= new_listeners;
			
			if ( async && listeners.size() == 0 ){
				
				async_thread = null;
				
					// try and wake up the thread so it kills itself
				
//				dispatch_sem.release();
			}
		}
	}
	
	public boolean
	hasListener(
		T		listener )
	{
		synchronized( this ){

			return( listeners.contains( listener ));
		}
	}
	
	public void
	clear()
	{
		synchronized( this ){
									
			listeners	= new ArrayList<T>();
			
			if ( async ){
				
				async_thread = null;
				
					// try and wake up the thread so it kills itself
				
//				dispatch_sem.release();
			}
		}
	}
	
	public List<T>
	getListenersCopy()
	{
			// we can just return the listeners as we copy on update
				
		return( listeners );
	}
	
	public void
	dispatch(
		int		type,
		Object	value )
	{
		dispatch( type, value, false );
	}
	
	public void
	dispatch(
		int			type,
		Object		value,
		boolean		blocking )
	{
		if ( async ){
			
//			AESemaphore	sem = null;
			
			if ( blocking ){
				
//				sem = new AESemaphore( "ListenerManager:blocker");
			}
			
			synchronized( this ){
				
					// if there's nobody listening then no point in queueing 
				
				if ( listeners.size() == 0 ){
						
					return;
				}
				
					// listeners are "copy on write" updated, hence we grab a reference to the 
					// current listeners here. Any subsequent change won't affect our listeners
												
//				dispatch_queue.add(new Object[]{listeners, new Integer(type), value, sem });
			}
			
//			dispatch_sem.release();
			
//			if ( sem != null ){
//
//				sem.reserve();
//			}
		}else{
			
			if ( target_with_exception != null ){
				
				throw( new RuntimeException( "call dispatchWithException, not dispatch"));
			}
			
			List<T>	listeners_ref;
			
			synchronized( this ){
				
				listeners_ref = listeners;				
			}	
			
			try{
				dispatchInternal( listeners_ref, type, value );
				
			}catch( Throwable e ){
				
//				Debug.printStackTrace( e );
			}
		}
	}	
	
	public void
	dispatchWithException(
		int		type,
		Object	value )
	
		throws Throwable
	{
		List<T>	listeners_ref;
		
		synchronized( this ){
			
			listeners_ref = listeners;			
		}
		
		dispatchInternal( listeners_ref, type, value );
	}
	
	public void
	dispatch(
		T		listener,
		int		type,
		Object	value )
	{
		dispatch( listener, type, value, false );
	}
	
	public void
	dispatch(
		T		listener,
		int		type,
		Object	value,
		boolean	blocking )
	{
		if ( async ){
			
//			AESemaphore	sem = null;
			
			if ( blocking ){
				
//				sem = new AESemaphore( "ListenerManager:blocker");
			}
	
			synchronized( this ){
								
					// 5 entries to denote single listener
				
//				dispatch_queue.add(new Object[]{ listener, new Integer(type), value, sem, null });
				
				if ( async_thread == null ){
					
					async_thread = new AEThread2( name, true )
						{
							public void
							run()
							{
								dispatchLoop();
							}
						};
											
					async_thread.start();
				}
			}
			
//			dispatch_sem.release();
	
//			if ( sem != null ){
//
//				sem.reserve();
//			}
		}else{
			
			if ( target_with_exception != null ){
				
				throw( new RuntimeException( "call dispatchWithException, not dispatch"));
			}
			
			doDispatch( listener, type, value );
		}
	}

	protected String
	getListenerName( 
		T	 listener )
	{
		Class listener_class = listener.getClass();
		
		String	res = listener_class.getName();
		
		try{
			Method getString = listener_class.getMethod( "getString", new Class[0]);
			
			if ( getString != null ){
				
				String s = (String)getString.invoke( listener, new Object[0] );
				
				res += " (" + s + ")";
			}
		}catch( Throwable e ){
			
		}
		
		return( res );
	}
	
	protected void
	doDispatch(
		T			listener,
		int			type,
		Object		value )
	{
		try{
			if ( TIME_LISTENERS ){

				long	start = SystemTime.getCurrentTime();
				
				try{

					target.dispatch( listener, type, value );

				}finally{
					
					long duration = SystemTime.getCurrentTime() - start;
					
					System.out.println( name + "/" + type + ": " + getListenerName( listener ) + " - " + duration );
				}
			}else{
			
				target.dispatch( listener, type, value );
			}
			
		}catch( Throwable e ){
			
//			Debug.printStackTrace( e );
		}
	}
	
	protected void
	doDispatchWithException(
		T			listener,
		int			type,
		Object		value )
	
		throws Throwable
	{
		if ( TIME_LISTENERS ){

			long	start = SystemTime.getCurrentTime();
			
			try{
				target_with_exception.dispatchWithException( listener, type, value );

			}finally{
				
				long duration = SystemTime.getCurrentTime() - start;
				
				System.out.println( name + "/" + type + ": " + getListenerName( listener ) + " - " + duration );
			}
		}else{
		
			target_with_exception.dispatchWithException( listener, type, value );
		}
	}
	
	protected void
	dispatchInternal(
		List<T>		listeners_ref,
		int			type,
		Object		value )
	
		throws Throwable
	{		
		for (int i=0;i<listeners_ref.size();i++){
		
			
			if ( target_with_exception != null ){
					
				// System.out.println( name + ":dispatchWithException" );
				
					// DON'T catch and handle exceptions here are they are permitted to
					// occur!
				
				doDispatchWithException( listeners_ref.get(i), type, value );
					
			}else{
			
				doDispatch( listeners_ref.get(i), type, value );
			}
		}
	}
	
	protected void
	dispatchInternal(
		T			listener,
		int			type,
		Object		value )
	
		throws Throwable
	{		
		if ( target_with_exception != null ){
				
				// System.out.println( name + ":dispatchWithException" );
				
				// DON'T catch and handle exceptions here are they are permitted to
				// occur!

			doDispatchWithException( listener, type, value );
				
		}else{
			
			doDispatch( listener, type, value );
		}
	}
	
	public void
	dispatchLoop()
	{
		// System.out.println( "ListenerManager::dispatch thread '" + Thread.currentThread() + "' starts");
		
		while(true){
			
//			dispatch_sem.reserve();
			
			Object[] data = null;
			
			synchronized( this ){
				
				if ( async_thread == null || !async_thread.isCurrentThread()){
					
						// we've been asked to close. this sem reservation must be
						// "returned" to the pool in case it represents a valid  entry
						// to be picked up by another thread
					
//					dispatch_sem.release();
					
					break;
				}
				
				if ( dispatch_queue.size() > 0 ){
					
					data = (Object[])dispatch_queue.remove(0);
				}
			}
			
			if ( data != null ){
			
				try{						
					if ( data.length == 4 ){
					
						dispatchInternal((List<T>)data[0], ((Integer)data[1]).intValue(), data[2] );
						
					}else{
						
						dispatchInternal((T)data[0], ((Integer)data[1]).intValue(), data[2] );
					}
					
				}catch( Throwable e ){
					
//					Debug.printStackTrace( e );
					
				}finally{
					
					if ( data[3] != null ){
						
//						((AESemaphore)data[3]).release();
					}
				}
			}
		}
		
		// System.out.println( "ListenerManager::dispatch thread '" + Thread.currentThread() + "' ends");
	}
	
	public static <T>void
	dispatchWithTimeout(
		List<T>								_listeners,
		final ListenerManagerDispatcher<T>	_dispatcher,
		long								_timeout )
	{
		final List<T>	listeners = new ArrayList<T>( _listeners );
		
		final boolean[]	completed = new boolean[listeners.size()];
		
//		final AESemaphore	timeout_sem = new AESemaphore("ListenerManager:dwt:timeout");
		
		for (int i=0;i<listeners.size();i++){
			
			final int f_i	= i;
						
			new AEThread2( "ListenerManager:dwt:dispatcher", true ){
				public void
				run()
				{
					try{
						_dispatcher.dispatch( listeners.get(f_i), -1, null );
						
					}catch( Throwable e ){
						
//						Debug.printStackTrace(e);
						
					}finally{
						
						completed[f_i]	= true;
						
//						timeout_sem.release();
					}
				}
			}.start();
		}
		
		boolean	timeout_occurred = false;
		
		for (int i=0;i<listeners.size() ;i++){
			
			if ( _timeout <= 0 ){
				
				timeout_occurred	= true;
				
				break;
			}
			
			long start = SystemTime.getCurrentTime();
			
//			if ( !timeout_sem.reserve( _timeout )){
//
//				timeout_occurred	= true;
//
//				break;
//			}
			
			long end = SystemTime.getCurrentTime();

//			if ( end > start ){
//
//				_timeout = _timeout - ( end - start );
//			}
		}
		
		if ( timeout_occurred ){
			
			String	str = "";
			
			for (int i=0;i<completed.length;i++){
			
				if ( !completed[i] ){
					
					str += (str.length()==0?"":",") + listeners.get(i);
				}
			}
			
			if ( str.length() > 0 ){
				
//				Debug.out( "Listener dispatch timeout: failed = " + str );
			}
		}
	}
	
	public long size() {
		if (listeners == null)
			return 0;

		return listeners.size();
	}
}

